tutorials/026 - Amazon Timestream.ipynb (284 lines of code) (raw):
{
"cells": [
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"[](https://github.com/aws/aws-sdk-pandas)\n",
"\n",
"# 26 - Amazon Timestream"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"## Creating resources"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [],
"source": [
"from datetime import datetime\n",
"\n",
"import pandas as pd\n",
"\n",
"import awswrangler as wr\n",
"\n",
"database = \"sampleDB\"\n",
"table_1 = \"sampleTable1\"\n",
"table_2 = \"sampleTable2\"\n",
"wr.timestream.create_database(database)\n",
"wr.timestream.create_table(database, table_1, memory_retention_hours=1, magnetic_retention_days=1)\n",
"wr.timestream.create_table(database, table_2, memory_retention_hours=1, magnetic_retention_days=1)"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"## Write\n",
"\n",
"### Single measure WriteRecord"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Number of rejected records: 0\n"
]
}
],
"source": [
"df = pd.DataFrame(\n",
" {\n",
" \"time\": [datetime.now()] * 3,\n",
" \"dim0\": [\"foo\", \"boo\", \"bar\"],\n",
" \"dim1\": [1, 2, 3],\n",
" \"measure\": [1.0, 1.1, 1.2],\n",
" }\n",
")\n",
"\n",
"rejected_records = wr.timestream.write(\n",
" df=df,\n",
" database=database,\n",
" table=table_1,\n",
" time_col=\"time\",\n",
" measure_col=\"measure\",\n",
" dimensions_cols=[\"dim0\", \"dim1\"],\n",
")\n",
"\n",
"print(f\"Number of rejected records: {len(rejected_records)}\")"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {
"collapsed": false
},
"source": [
"### Multi measure WriteRecord"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"df = pd.DataFrame(\n",
" {\n",
" \"time\": [datetime.now()] * 3,\n",
" \"measure_1\": [\"10\", \"20\", \"30\"],\n",
" \"measure_2\": [\"100\", \"200\", \"300\"],\n",
" \"measure_3\": [\"1000\", \"2000\", \"3000\"],\n",
" \"tag\": [\"tag123\", \"tag456\", \"tag789\"],\n",
" }\n",
")\n",
"rejected_records = wr.timestream.write(\n",
" df=df,\n",
" database=database,\n",
" table=table_2,\n",
" time_col=\"time\",\n",
" measure_col=[\"measure_1\", \"measure_2\", \"measure_3\"],\n",
" dimensions_cols=[\"tag\"],\n",
")\n",
"\n",
"print(f\"Number of rejected records: {len(rejected_records)}\")"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"## Query"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>time</th>\n",
" <th>measure_value::double</th>\n",
" <th>dim0</th>\n",
" <th>dim1</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>2020-12-08 19:15:32.468</td>\n",
" <td>1.0</td>\n",
" <td>foo</td>\n",
" <td>1</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>2020-12-08 19:15:32.468</td>\n",
" <td>1.2</td>\n",
" <td>bar</td>\n",
" <td>3</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2</th>\n",
" <td>2020-12-08 19:15:32.468</td>\n",
" <td>1.1</td>\n",
" <td>boo</td>\n",
" <td>2</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" time measure_value::double dim0 dim1\n",
"0 2020-12-08 19:15:32.468 1.0 foo 1\n",
"1 2020-12-08 19:15:32.468 1.2 bar 3\n",
"2 2020-12-08 19:15:32.468 1.1 boo 2"
]
},
"execution_count": 12,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"wr.timestream.query(\n",
" f'SELECT time, measure_value::double, dim0, dim1 FROM \"{database}\".\"{table_1}\" ORDER BY time DESC LIMIT 3'\n",
")"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {
"collapsed": false
},
"source": [
"## Unload"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"df = wr.timestream.unload(\n",
" sql=f'SELECT time, measure_value, dim0, dim1 FROM \"{database}\".\"{table_1}\"',\n",
" path=\"s3://bucket/extracted_parquet_files/\",\n",
" partition_cols=[\"dim1\"],\n",
")"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"## Deleting resources"
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [],
"source": [
"wr.timestream.delete_table(database, table_1)\n",
"wr.timestream.delete_table(database, table_2)\n",
"wr.timestream.delete_database(database)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3.9.14",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.14"
},
"pycharm": {
"stem_cell": {
"cell_type": "raw",
"metadata": {
"collapsed": false
},
"source": []
}
}
},
"nbformat": 4,
"nbformat_minor": 4
}